package org.codehaus.activemq.store.bdbn;

import com.sleepycat.db.Db;
import com.sleepycat.db.DbException;
import com.sleepycat.db.DbTxn;
import com.sleepycat.db.Dbt;
import java.io.IOException;
import javax.jms.JMSException;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.QueueMessageContainer;
import org.codehaus.activemq.store.MessageStore;
import org.codehaus.activemq.util.JMSExceptionHelper;

public class BDbMessageStore
  implements MessageStore
{
  private static final int SUCCESS = 0;
  private Db database;
  private WireFormat wireFormat;
  private MessageContainer container;

  public void setMessageContainer(MessageContainer container)
  {
    this.container = container;
  }

  public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
    String messageID = message.getJMSMessageID();
    try {
      Dbt key = createKey(messageID);
      Dbt value = new Dbt(asBytes(message));
      this.database.put(BDbHelper.getTransaction(), key, value, 0);
      return new MessageIdentity(messageID);
    }
    catch (DbException e) {
      throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
    } catch (IOException e) {
    	   throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
    	   
    }
  }

  public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException
  {
    String messageID = identity.getMessageID();
    ActiveMQMessage answer = null;
    try {
      Dbt key = createKey(messageID);
      Dbt value = new Dbt();
      if (this.database.get(null, key, value, 0) == 0) {
        answer = extractMessage(value);
      }
      return answer;
    }
    catch (DbException e) {
      throw JMSExceptionHelper.newJMSException("Failed to peek next message after: " + messageID + " from container: " + e, e);
    } catch (IOException e) {
    	   throw JMSExceptionHelper.newJMSException("Failed to peek next message after: " + messageID + " from container: " + e, e);
    	   
    }
  }

  public void removeMessage(MessageIdentity identity, MessageAck ack)
    throws JMSException
  {
    DbTxn transaction = null;
    String messageID = identity.getMessageID();
    try {
      this.database.delete(BDbHelper.getTransaction(), createKey(messageID), 0);
    }
    catch (DbException e) {
      throw JMSExceptionHelper.newJMSException("Failed to delete message: " + messageID + " from container: " + e, e);
    }
  }

  public void recover(QueueMessageContainer container) throws JMSException
  {
  }

  public void start() throws JMSException
  {
  }

  public void stop() throws JMSException {
    try {
      this.database.close(0);
    }
    catch (DbException e) {
      throw JMSExceptionHelper.newJMSException("Failed to close MessageStore. Reason: " + e, e);
    }
  }

  protected Dbt createKey(String messageID)
  {
    Dbt key = new Dbt(asBytes(messageID));
    return key;
  }

  protected ActiveMQMessage extractMessage(Dbt value) throws IOException
  {
    synchronized (this.wireFormat) {
      return (ActiveMQMessage)this.wireFormat.fromBytes(value.getData());
    }
  }

  protected byte[] asBytes(ActiveMQMessage message)
    throws IOException, JMSException
  {
    synchronized (this.wireFormat) {
      return this.wireFormat.toBytes(message);
    }
  }

  protected byte[] asBytes(String messageID) {
    return messageID.getBytes();
  }
}